package com.apobates.forum.grief.stream;

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
 * A forker of streams
 *
 * @param <T> the type of elements output from the forked stream.
 * @author xiaofanku
 */
public class StreamForker<T> {
    private final Stream<T> stream;
    private final Map<Object, Function<Stream<T>, ?>> forks = new HashMap<>();

    public StreamForker(Stream<T> stream){
        this.stream = stream;
    }
    /**
     * Add a fork
     *
     * @param f the fork function to apply to a forked stream. The stream is
     * forked, and the function is applied to that forked stream to produce a
     * result.
     * @param key the type of result of the fork
     * @return a typed memento associated with the fork that can be used to
     * obtain the result returned by the fork function
     */
    public StreamForker<T> fork(Object key, Function<Stream<T>, ?> f) {
        forks.put(key, f);
        return this;
    }

    /**
     * Fork a stream
     *
     * <p>
     * The stream will be forked N times where N is the number of forks added.
     *
     *
     * @return the results of forking
     */
    public ForkedStreamResults getResults() {
        // @@@ Obtain the spliterator from the stream so as to
        // get the characteristics that can be passed to the
        // LinkedBlockingQueueSpliterator, then re-create the
        // to-be-forked stream from that Spliterator

        ForkingStreamConsumer<T> consumer = build();
        try {
            // @@@ If the stream is parallel then the encounter order,
            // if any, will not be preserved, in addition the parallel
            // execution will compete with the execution of the forked
            // streams
            stream.sequential().forEach(consumer);
        } finally {
            consumer.finish();
        }
        return consumer;
    }

    private ForkingStreamConsumer<T> build() {
        List<LinkedBlockingQueue<T>> queues = new ArrayList<>();
        Map<Object, CompletableFuture<?>> actions = forks.entrySet().stream().reduce(
                new HashMap<Object, CompletableFuture<?>>(), (map, e)->{
                    map.put(e.getKey(), getOperationResult(queues, e.getValue()));
                    return map;
                }, (m1, m2)->{
                    m1.putAll(m2);
                    return m1;
                });
        return new ForkingStreamConsumer<>(queues, actions);
    }
    private CompletableFuture<?> getOperationResult(List<LinkedBlockingQueue<T>> queues, Function<Stream<T>, ?> f){
        LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();
        queues.add(queue);
        Spliterator<T> sp = new LinkedBlockingQueueSpliterator<>(queue);
        Stream<T> source = StreamSupport.stream(sp, false);
        return CompletableFuture.supplyAsync(()->f.apply(source));
    }
}
